/**
 * FileName: FilterChain
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2018/3/1 10:40
 * Description:过滤链
 */
package cn.com.bonc.filter;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.ArrayList;
import java.util.List;

public class FilterChain {

   private List<Filter> filters;
   private Dataset<Row> rowDataset;


   private FilterChain(Dataset<Row> rowDataset) {
      this.rowDataset=rowDataset;
      filters= new ArrayList<>();
   }

   /**
    * 设置过滤链被过滤的源数据
    * @param rowDataset
    * @return
    */
   public static FilterChain setSourceData(Dataset<Row> rowDataset){
      return new FilterChain(rowDataset);
   }

   /**
    * 为过滤器链添加过滤器
    * @param filter
    * @return
    */
   public FilterChain addFilter(Filter filter){
      if (filters!=null){
         filters.add(filter);
      }
      return this;
   }

   /**
    * 执行链中的每个过滤，并得到最终结果
    * @return
    */
   public Dataset<Row> execute(){
      if (filters!=null){
         for (Filter filter : filters) {
            rowDataset= filter.doFilter(rowDataset);
         }
         return rowDataset;
      }
      return null;
   }
}